Fork me on GitHub

Java并发编程之美

注意:所有文章除特别说明外,转载请注明出处.

Java并发编程之美

[TOC]

第1章 基础

第2章 基础扩展

第3章 Java并发包ThreadLocalRandom类原理剖析

该类是JUC包下新增的随机数生成器,弥补Random类在多线程下的缺陷。

3.1 Random类的局限性

每个Random实例里面都有一个原子性的种子变量用来记录当前的种子值,当要生成新的随机数时需要根据当前种子计算新的种子并更新回原子变量。

在多线程环境下使用单个Random实例生成随机数时,当多个线程同时计算随机数来计算新的种子时,多个线程会竞争同一个原子变量的更新操作,由于原子变量的更新是CAS操作,同时只有一个线程会成功,所以会造成大量线程进行自旋重试,会降低并发性能。

3.2 ThreadLocalRandom

package com.ccpc.edu.xidian.cn.ccpc.example.random;

import java.util.concurrent.ThreadLocalRandom;

public class example1 {

    public static void main(String[] args) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < 10; i++){
            System.out.println(random.nextInt(5));
        }
    }
}

ThreadLocalRandom random = ThreadLocalRandom.current();来获取当前线程的随机数生成器。

==ThreadLocalRandom 可以被联想到ThreadLocal(通过让每一个线程复制一份变量,使得每一个线程对变量进行操作时实际是操作自己本地内存的副本,从而避免对共享变量进行同步)。ThreadLocalRandom 的实现也是如此。==

提示:Random类的缺点是多个线程会使用同一个原子性种子变量,从而导致对原子变量更新的竞争。

3.3 源码分析

ThreadLocalRandom 类似于一个ThreadLocal类,就是个工具类。当线程调用ThreadLocalRandom负责初始化调用线程的threadLocalRandomSeed变量,也就是初始化种子。

当多线程通过ThreadLocalRandom random = ThreadLocalRandom.current();获取ThreadLocalRandom 实例时,其实获取的是同一个实例。但是由于具体的种子是存放在线程里面的,所以在ThreadLocalRandom 实例里面只包含与线程无关的通用算法,是线程安全的。

3.4 总结

ThreadLocalRandom 使用ThreadLocal的原理,让每个线程都持有一本地种子变量,该种子变量只有在使用随机数时才会被初始化。在多线程下==计算新种子时时根据自己线程内维护的种子变量进行更新,从而避免竞争。==


第4章 Java 并发包中原子操作类原理剖析

JUC包提供一系列原子性操作类,这些类都使用非阻塞算法CAS实现的,相比锁实现原子性操作在性能上有很大的提升。

4.1 原子变量操作类

JUC并发包包含有 AtomicInteger、AtomicLong等。它们原理相似。

AtomicLong是原子性递增或递减类,其内部使用Unsafe来实现。

因为AtomincLong类也是在rt.jar包下面,AtomicLong就是通过BootStrap类加载器进行加载的。所以能够通过Unsafe.getUnsafe()方法获取Unsafe的实例。

4.2 LongAdder

AtomicLong通过CAS提供了非阻塞的原子性操作,相比于使用阻塞算法的同步器来说它好。

在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS操作,这就会白白浪费CPU资源。

在JDK8中使用LongAdder用来克服在高并发下使用AtomicLong的缺点。

在LongAdder内部维护多个Cell变量,每个Cell变量里面有一个初始值为0的long型变量。在这种情况下,相对AtomicLong来说争夺单个变量更新操作的线程量会减少,所以变相的减少了争夺共享资源的并发量。

另外,多个线程在争夺同一个Cell原子变量时如果失败,并不是在当前Cell变量上一直自旋CAS重试,而是尝试在其它Cell的变量上进行CAS尝试,这改变增加了当前线程CAS成功的可能性。

最后在获取LongAdder当前值时,是将所有Cell变量的value值累加后再加上base返回的。

提示:LongAdder维护了一个延迟初始化的原子性更新数组(默认情况下Cell数组是null)和一个基值变量base。

4.3 LongAccumulator类

LongAdder是LongAccumulator的特例。

LongAccumulator相比于LongAdder可以为累加器提供非0的初始值,后者只能提供默认的0值。同时前者能够指定累加规则


第5章 Java并发包中并发List源码剖析

在并发包中只有CopyOnWriteArrayList,是一个线程安全的ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)上进行的,也是使用了写时复制策略。

提示:在CopyOnWriteArrayList中迭代器的弱一致性是指返回迭代器之后,其它线程对list的增删改对迭代器是不可见的。

5.3 总结

CopyOnWriteArrayList 使用写时复制的策略来保证list的一致性,而获取-修改-写入三步操作并不是原子性的,所以在增删改的过程中都使用独占锁,来保证某个时间只有一个线程能对list数组进行修改。

另外,CopyOnWriteArrayList提供了弱一致性的迭代器,从而保证在获取迭代器后,其它线程对list的修改是不可见的,迭代器遍历的数组是一个快照。


第6章 Java并发包中锁原理剖析

6.1 LockSupport工具类

JDK中的rt.jar包里面的LockSupport是个工具类,它的主要作用是唤醒和挂起线程,该工具类是创建锁和其它同步类的基础。

LockSupport 与每个使用它的线程都关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的。

LockSupport 是使用Unsafe类实现的。

1. void park()方法

如果调用park()方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度(被挂起)。

提示:因调用park()方法而被阻塞的线程被其它线程中断而返回时并不会抛出InterruptedException异常。

2. void unpark(Thread thread)方法

当一个线程调用unpark()时,如果参数thread线程没有持有与LockSupport类关联的许可证,则让thread线程持有。

如果thread之前因为调用park()而被挂起,在调用unpark()之后,该线程会被唤醒。

3. void parkNanos(long nanos)方法

该方法与park()方法类似,不同在于如果没有拿到许可证,则调用线程会被挂起nanos时间后修改为自动返回。

6.2 抽象同步队列AQS

1. AQS概述

它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。

6.3 独占锁 ReentrantLock 原理

1. 类图结构

ReentrantLock 是可重入独占锁,同时只能有一个线程可以获取该锁,其它获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。

ReentrantLock 最终还是使用AQS实现,并且根据参数来决定内部是公平还是非公平锁,默认非公平锁。

6.4 读写锁 ReentrantReadWriteLock 的原理

解决线程安全使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而在实际中会有写少读多的场景。

ReentrantReadWriteLock 采用读写分离的策略,允许多个线程可以同时获取读锁。

ReentrantReadWriteLock 巧妙使用state的高16位表示读状态,也就是获取到读锁的次数。使用低16位表示获取到写锁的线程的可重入次数。

6.5 StampedLock


第7章 并发队列原理剖析

7.1 ConcurrentLinkedQueue 原理探究

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对入队和出队操作使用CAS来实现线程安全。

7.2 LinkedBlockingQueue 原理探究

该队列是使用独占锁实现的阻塞队列。

1. offer()

向队列尾部插入一个元素。

2. put()

向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,否则队列已满则阻塞当前线程,直到队列有空闲插入成功后返回。

3. poll()

从队列头部获取并移除一个元素,如果队列为空则返回null。该方法不是阻塞的。

4. peek()

….


第8章 Java并发包线程池ThreadPoolExecutor原理探究

线程池解决:1. 在执行大量异步任务时线程池能够提供较好的性能。2. 线程池提供了一种资源限制和管理的手段,可限制线程的个数,动态新增线程等。

8.3 源码分析

1. public void execute(Runnable command)

该方法作用是提交任务command到线程池进行执行。


第9章 Java并发包 ScheduledThreadPoolExecutor 原理

ThreadPoolExecutor只是Executors工具的部分功能。ScheduledThreadPoolExecutor 是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

9.2 类图介绍

/**
 * Period in nanoseconds for repeating tasks.  A positive
 * value indicates fixed-rate execution.  A negative value
 * indicates fixed-delay execution.  A value of 0 indicates a
 * non-repeating task.
 */
private final long period;

在ScheduledFutureTask内部有一个变量period用来表示任务类型。

period = 0 表示一次性任务,执行完便退出。

period < 0 表示fixed-delay以固定延迟的定时可可重复执行任务

period > 0 表示fixed-rate以固定频率的定时可重复执行任务

9.3 原理剖析

1. schedule
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();

    //任务转换
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));

    //添加任务到延迟队列
    delayedExecute(t);
    return t;
}

该方法的作用是提交一个延迟执行的任务,任务从提交时间算起,延迟单位为unit的delay时间后开始执行,提交的任务不是周期性任务,任务只会执行一次。

2. scheduleWithFixedDelay
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 * @throws IllegalArgumentException   {@inheritDoc}
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();

    //任务转换,注意这里是period=-delay<0
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

当任务执行完毕之后,让其延迟固定时间后再次运行。initialDelay表示提交任务后延迟多少时间来开始执行command,delay表示任务执行完后延长多少时间再次运行command任务,unit时间单位。

任务会一直运行直到任务运行中抛出异常被取消或者关闭线程池。

3. 总结

fixed-delay类型的任务执行原理是,==当添加一个任务到延迟队列后,等待initialDelay时间后,任务就会过期,过期的任务就会被从队列移除,并执行。== 执行完毕后,会重新设置任务的延迟时间,然后再将任务放入延迟队列,循环往复。

注意:如果一个任务在执行过程中抛出异常,那么这个任务就结束了,但是不影响其它任务的执行。

4. scheduleAtFixedRate
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 * @throws IllegalArgumentException   {@inheritDoc}
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

该方法是相对起始时间点以固定频率调用指定任务(fixed-rate任务)。当将任务提交到线程池并延迟initialDelay时间(时间单位为unit)后开始执行任务command。然后从initialDelay+period时间点再次执行,然后在initialDelay+2*period时间点再次执行,循环往复,直到抛出异常或调用了任务的cancel()方法取消任务或者关闭线程池。

scheduleAtFixedRate 原理与 scheduleWithFixedDelay 类似。

5. 总结

相对于fixed-delay任务来说,fixed-rate方式执行规则是,时间为 initialDelay+n*period 时间启动任务,但是如果当前任务还没执行完,下一次要执行任务的时间到了,并不会并发执行,下次要执行的任务会被延迟执行。

9.4 总结

ScheduledThreadPoolExecutor 的实现原理是,内部使用 DelayQueue 来存放具体任务。任务分为3种:1. 一次性执行任务,执行完毕之后就结束,fixed-delay任务保证同一个任务在多次执行之间间隔固定时间。2. fixed-rate任务保证按照固定的频率执行,任务类型施一公period的值来区分。


第10章 Java并发包中线程同步器原理剖析

10.1 CountDownLatch原理

在CountDownLatch出现之前一般都是使用join()方法来实现这一点,但是join()方法不够灵活,不能够满足不同场景的需求。

总结:CountDownLatch与join()方法区别:1. 调用一个子线程join()方法之后,会一直阻塞直到子线程运行完毕。2. countDownLatch使用计数器来允许子线程运行完毕或者在运行中递减计数,可以在子线程运行的任何时候让await()方法返回而不一定必须等到线程结束。 ==另外,使用线程池来管理线程时,一般都是直接添加Runnable到线程池,这时候就没法再次调用线程的join()方法。

1. void await()

在线程调用CountDownLatch对象的await()方法之后,当前线程会被阻塞,直到:1. 当所有线程都调用了CountDownLatch对象的countDown()方法后,计数器值为0。2. 其它线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出IE异常,然后返回。

2. boolean await(long timeout, TimeUnit unit)

在线程调用CountDownLatch对象的await()方法之后,当前线程会被阻塞,直到:1. 所有线程都调用了CountDownLatch对象的countDown方法后,计数器值为0时,返回true。2. 设置的timeout时间到了,因为超时返回false。3. 其它线程调用了当前线程的interrupt()方法中断了当前线程,当前线程抛出IE异常,然后返回。

3. void countDown()

线程调用该方法之后,计数器的值递减,递减后如果计数器值为0则唤醒所有因调用await()方法而被阻塞的线程,否则什么都不用做。

4. long getCount()

获取当前计数器的值,也就是AQS的state值,一般在测试时使用该方法。

总结

CountDownLatch是使用AQS的状态变量来存放计数器的值。在计数器值变为0时,当前线程还要调用AQS的doReleaseShared()方法来激活由于调用await()方法而被阻塞的线程。

10.2 回环屏障 CyclicBarrier 原理

因为CountDownLatch的计数器是一次性的,在等到计数器值变为0后,再调用CountDownLatch的await()和countDown()方法都会立刻返回,这就起不到线程同步的效果。

CyclicBarrier 的功能并不限于CountDownLatch的功能。它可以让一组线程全部达到一个状态后再全部同时执行。

线程在调用 await() 方法之后就会被阻塞,等到所有线程都调用了 await() 方法之后,线程们就会冲破屏障,继续向下运行。

/**
 * Creates a new {@code CyclicBarrier} that will trip when the
 * given number of parties (threads) are waiting upon it, and which
 * will execute the given barrier action when the barrier is tripped,
 * performed by the last thread entering the barrier.
 *
 * @param parties the number of threads that must invoke {@link #await}
 *        before the barrier is tripped
 * @param barrierAction the command to execute when the barrier is
 *        tripped, or {@code null} if there is no action
 * @throws IllegalArgumentException if {@code parties} is less than 1
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
1. int await()

当前线程调用该方法时会被阻塞,直到满足条件:1. parties个线程都调用了await()方法,也就是线程到达屏障点。2. 其它线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程抛出IE异常返回。返回。

2. boolean await(long timeout, TimeUnit unit)

当前线程调用该方法阻塞,直到满足条件返回:1. parties个线程都调用了await()方法返回true。2. 设置的超时时间到了后返回false。3. 其它线程调用当前线程的interrupt()方法中断当前线程,则抛出IE异常并返回。

3. int dowait(boolean timed, long nanos)
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        ...
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            ...
        }
    } finally {
        lock.unlock();
    }
}

当线程调用 dowait() 方法之后,首先会获取独占锁lock,如果在创建 CyclicBarrier 时传递的参数是10,那么后面9个调用线程会被阻塞。然后获取到锁的线程会对计数器count进行递减操作。

10.3 信号量 Semaphore 原理

它内部的计数器是递增的,并且在一开始初始化 Semaphore 时可以指定一个初始值,但不需要知道需要同步的线程个数,而是在需要同步的地方调用 acquire() 方法时指定需要同步的线程个数。

提示:如果在构建Semaphore时传递的参数N,并在M个线程中调用了该信号量的release()方法,那么在调用acquire()使M个线程同步时传递的参数应该是M+N。

1. void acquire()

当前线程调用该方法的目的是希望获取一个信号量资源。

如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。

当其它线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程抛出IE异常返回。

2. void acquire(int permits)

该方法只需要获取permits个信号量值。

3. void acquireUniterruptibly()

该方法与acquire()方法类似,不同在于该方法对中断不响应。

4. void acquireUniterruptibly(int permits)

与acquire(int permits)方法不一样在于该方法对中断不响应。

5. void release()

该方法作用在于将当前的Semaphore对象的信号量值增加1,如果当前线程因为调用acquire()方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

6. void release(int permits)

与方法release()不同的在于,前者每次调用会在信号量值原来的基础上增加permits,而后者每次增加1。

本文标题:Java并发编程之美

文章作者:Bangjin-Hu

发布时间:2019年10月15日 - 09:22:26

最后更新:2020年03月30日 - 08:10:49

原始链接:http://bangjinhu.github.io/undefined/Java 并发编程之美/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Bangjin-Hu wechat
欢迎扫码关注微信公众号,订阅我的微信公众号.
坚持原创技术分享,您的支持是我创作的动力.